package com.dahuan.tables.udf;

import com.dahuan.bean.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;

public class UDF_ScalarFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );
        //指定事件时间
        env.setStreamTimeCharacteristic( TimeCharacteristic.EventTime );

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create( env );

        // 2. 读入文件数据，得到DataStream
        DataStream<String> inputStream = env.readTextFile( "E:\\Project\\FlinkTutorials\\Flink-Scala\\src\\main\\resources\\sensor.txt" );

        // 3. 转换成POJO
        DataStream<SensorReading> dataStream = inputStream.map( line -> {
            String[] fields = line.split( "," );
            return new SensorReading( fields[0], new Long( fields[1] ), new Double( fields[2] ) );
        } );

        // 4. 将流转换成表，定义时间特性
        Table sensorTable = tableEnv.fromDataStream( dataStream, "id, timestamp as ts, temperature as temp" );

        // 5. 自定义标量函数，实现求id的hash值
        // 5.1 tableAPI
        HashCode hashCode = new HashCode( 23 );
        //需要在环境中注册UDF
        tableEnv.registerFunction( "hashCode",hashCode );
        Table resultTable = sensorTable.select( "id,ts,hashCode(id)" );

        // 5.2 SQL
        tableEnv.createTemporaryView( "sensor",sensorTable );
        Table resultSql = tableEnv.sqlQuery( "select id,ts,hashCode(id) from sensor" );

        //打印输出
        tableEnv.toAppendStream( resultTable, Row.class ).print("TableAPI");
        System.out.println("--------------------");
        tableEnv.toAppendStream( resultSql, Row.class ).print("sql");


        env.execute("UDF_ScalarFunction");
    }

    //实现自定义的ScalarFunction
    public static class HashCode extends ScalarFunction{

        private int factor = 13;

        public HashCode(int factor){
            this.factor = factor;
        }

        public int eval(String str){
            return  str.hashCode() * factor;
        }
    }
}
